In this notebook, you will practice working with generative models, using both normalising flow networks and the variational autoencoder algorithm. You will create a synthetic dataset with a normalising flow with randomised parameters. This dataset will then be used to train a variational autoencoder, and you will used the trained model to interpolate between the generated images. You will use concepts from throughout this course, including Distribution objects, probabilistic layers, bijectors, ELBO optimisation and KL divergence regularisers.
This project is peer-assessed. Within this notebook you will find instructions in each section for how to complete the project. Pay close attention to the instructions as the peer review will be carried out according to a grading rubric that checks key parts of the project instructions. Feel free to add extra cells into the notebook as required.
When you have completed the Capstone project notebook, you will submit a pdf of the notebook for peer review. First ensure that the notebook has been fully executed from beginning to end, and all of the cell outputs are visible. This is important, as the grading rubric depends on the reviewer being able to view the outputs of your notebook. Save the notebook as a pdf (File -> Download as -> PDF via LaTeX). You should then submit this pdf for review.
We'll start by running some imports below. For this project you are free to make further imports throughout the notebook as you wish.
#!pip install --upgrade --user tensorflow
#!pip install --upgrade --user tensorflow_probability
#!pip install matplotlib==3.2.2
import tensorflow as tf
import tensorflow_probability as tfp
tfd = tfp.distributions
tfb = tfp.bijectors
tfpl = tfp.layers
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

For the capstone project, you will create your own image dataset from contour plots of a transformed distribution using a random normalising flow network. You will then use the variational autoencoder algorithm to train generative and inference networks, and synthesise new images by interpolating in the latent space.
The complete normalising flow is given by the following chain of transformations:
The transformed random variable $x$ is given by $x = f_5(f_4(f_3(f_2(f_1(z)))))$.
tfb.Chain and tfb.TransformedDistribution to construct the final transformed distribution. log_det_jacobian methods for any subclassed bijectors that you write.def plot_distribution(samples, ax, title, col='red'):
ax.set_facecolor("black")
ax.scatter(samples[:, 0], samples[:, 1], marker='.', c=col, alpha=0.5) #edgecolor='k', s=200,
ax.set_xlim([-1,1])
ax.set_ylim([-1,1])
ax.set_title(title, size=15)
# f3(𝑧)=(𝑧1,𝑧2+𝑎𝑧1^2)
class Degree2Polynomial(tfb.Bijector):
def __init__(self, a):
self.a = a
super(Degree2Polynomial, self).__init__(forward_min_event_ndims=1, is_constant_jacobian=True)
def _forward(self, x):
return tf.concat([x[..., :1], x[..., 1:] + self.a * tf.square(x[..., :1])], axis=-1)
def _inverse(self, y):
return tf.concat([y[..., :1], y[..., 1:] - self.a * tf.square(y[..., :1])], axis=-1)
def _forward_log_det_jacobian(self, x):
return tf.constant(0., dtype=x.dtype)
# f4(𝑧)=Rz
class Rotation(tfb.Bijector):
def __init__(self, theta):
self.R = tf.constant([[np.cos(theta), -np.sin(theta)],
[np.sin(theta), np.cos(theta)]], dtype=tf.float32)
super(Rotation, self).__init__(forward_min_event_ndims=1, is_constant_jacobian=True)
def _forward(self, x):
return tf.linalg.matvec(self.R, x)
def _inverse(self, y):
return tf.linalg.matvec(tf.transpose(self.R), y)
def _forward_log_det_jacobian(self, x):
return tf.constant(0., x.dtype)
def get_normalizing_flow_dist(a, theta):
bijectors = [
tfb.Shift([0.,-2]), # f1
tfb.Scale([1,1/2]), # f2
Degree2Polynomial(a), # f3
Rotation(theta), # f4
tfb.Tanh() # f5
]
flow_bijector = tfb.Chain(list(reversed(bijectors)))
return tfd.TransformedDistribution(distribution=base_distribution,
bijector=flow_bijector)
nsamples= 10000
sigma = 0.3
base_distribution = tfd.MultivariateNormalDiag(loc=tf.zeros(2), scale_diag=sigma*tf.ones(2))
samples = base_distribution.sample(nsamples)
fig, ax = plt.subplots(figsize=(8,8))
plot_distribution(samples, ax, 'Base distribution', 'blue')
plt.show()
fig, axes = plt.subplots(nrows=2, ncols=2, figsize=(15,15))
axes = axes.flatten()
plt.subplots_adjust(0, 0, 1, 0.925, 0.05, 0.05)
colors = ['red', 'green', 'orange', 'magenta']
for i in range(4):
a = tfd.Normal(loc=3, scale=1).sample(1)[0].numpy()
theta = tfd.Uniform(low = 0, high = 2*np.pi).sample(1)[0].numpy()
transformed_distribution = get_normalizing_flow_dist(a, theta)
samples = transformed_distribution.sample(nsamples)
plot_distribution(samples, axes[i], r'$\theta$={:.02f}, a={:.02f}'.format(theta, a), colors[i])
plt.suptitle('Transformed Distribution with Normalizing Flow', size=20)
plt.show()
get_densities function useful: this calculates density values for a (batched) Distribution for use in a contour plot. (N, 36, 36, 3). Each image in the dataset should correspond to a contour plot of a transformed distribution from a normalising flow with an independently sampled set of parameters $s, T, S, b$. It will take a few minutes to create the dataset.get_densities function, the get_image_array_from_density_values function will help you to generate the dataset. # Helper function to compute transformed distribution densities
X, Y = np.meshgrid(np.linspace(-1, 1, 100), np.linspace(-1, 1, 100))
inputs = np.transpose(np.stack((X, Y)), [1, 2, 0])
def get_densities(transformed_distribution):
"""
This function takes a (batched) Distribution object as an argument, and returns a numpy
array Z of shape (batch_shape, 100, 100) of density values, that can be used to make a
contour plot with:
plt.contourf(X, Y, Z[b, ...], cmap='hot', levels=100)
where b is an index into the batch shape.
"""
batch_shape = transformed_distribution.batch_shape
Z = transformed_distribution.prob(np.expand_dims(inputs, 2))
Z = np.transpose(Z, list(range(2, 2+len(batch_shape))) + [0, 1])
return Z
# Helper function to convert contour plots to numpy arrays
import numpy as np
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
from matplotlib.figure import Figure
def get_image_array_from_density_values(Z):
"""
This function takes a numpy array Z of density values of shape (100, 100)
and returns an integer numpy array of shape (36, 36, 3) of pixel values for an image.
"""
assert Z.shape == (100, 100)
fig = Figure(figsize=(0.5, 0.5))
canvas = FigureCanvas(fig)
ax = fig.gca()
ax.contourf(X, Y, Z, cmap='hot', levels=100)
ax.axis('off')
fig.tight_layout(pad=0)
ax.margins(0)
fig.canvas.draw()
image_from_plot = np.frombuffer(fig.canvas.tostring_rgb(), dtype=np.uint8)
image_from_plot = image_from_plot.reshape(fig.canvas.get_width_height()[::-1] + (3,))
return image_from_plot
plt.figure(figsize=(5,5))
plt.subplots_adjust(0, 0, 1, 0.95, 0.05, 0.08)
for i in range(4):
a = tfd.Normal(loc=3, scale=1).sample(1)[0].numpy()
theta = tfd.Uniform(low = 0, high = 2*np.pi).sample(1)[0].numpy()
transformed_distribution = get_normalizing_flow_dist(a, theta)
transformed_distribution = tfd.BatchReshape(transformed_distribution, [1])
Z = get_densities(transformed_distribution)
image = get_image_array_from_density_values(Z.squeeze())
plt.subplot(2,2,i+1), plt.imshow(image), plt.axis('off')
plt.title(r'$\theta$={:.02f}, a={:.02f}'.format(theta, a), size=10)
plt.show()
N = 1000
image_dataset = np.zeros((N, 36, 36, 3))
for i in range(N):
a = tfd.Normal(loc=3, scale=1).sample(1)[0].numpy()
theta = tfd.Uniform(low = 0, high = 2*np.pi).sample(1)[0].numpy()
transformed_distribution = tfd.BatchReshape(get_normalizing_flow_dist(a, theta), [1])
image_dataset[i,...] = get_image_array_from_density_values(get_densities(transformed_distribution).squeeze())
image_dataset = tf.convert_to_tensor(image_dataset, dtype=tf.float32)
image_dataset.shape
plt.figure(figsize=(20,4))
plt.subplots_adjust(0, 0, 1, 0.95, 0.05, 0.08)
indices = np.random.choice(N, 20)
for i in range(20):
image = image_dataset[indices[i]].numpy()
image = image / image.max()
plt.subplot(2,10,i+1), plt.imshow(image), plt.axis('off')
plt.show()
tf.data.Dataset objects¶tf.data.Dataset objects for training and validation data. map method, normalise the pixel values so that they lie between 0 and 1.map method to return a tuple of input and output Tensors where the image is duplicated as both input and output.drop_remainder=True.element_spec property for one of the Dataset objects.n = len(image_dataset)
tf_image_dataset = tf.data.Dataset.from_tensor_slices(image_dataset)
tf_image_dataset = tf_image_dataset.shuffle(3)
tf_image_dataset = tf_image_dataset.map(lambda x : x / tf.reduce_max(x))
tf_image_dataset = tf_image_dataset.map(lambda x: (x, x))
train_sz = int(0.8*n)
training = tf_image_dataset.take(train_sz)
validation = tf_image_dataset.skip(train_sz)
training = training.batch(batch_size=20, drop_remainder=True)
validation = validation.batch(batch_size=20, drop_remainder=True)
training.element_spec
Sequential class.from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import (Dense, Flatten, Reshape, Concatenate, Conv2D, UpSampling2D, BatchNormalization)
latent_dim = 2 #50
prior = tfd.MultivariateNormalDiag(loc=tf.zeros(latent_dim))
def get_kl_regularizer(prior_distribution):
return tfpl.KLDivergenceRegularizer(prior_distribution,
weight=1.0,
use_exact_kl=False,
test_points_fn=lambda q: q.sample(3),
test_points_reduce_axis=(0,1))
kl_regularizer = get_kl_regularizer(prior)
def get_encoder(latent_dim, kl_regularizer):
return Sequential([
Conv2D(filters=32, kernel_size=3, activation='relu', strides=2, padding='same', input_shape=(36,36,3)),
BatchNormalization(),
Conv2D(filters=64, kernel_size=3, activation='relu', strides=2, padding='same'),
BatchNormalization(),
Conv2D(filters=128, kernel_size=3, activation='relu', strides=3, padding='same'),
BatchNormalization(),
Flatten(),
Dense(tfpl.MultivariateNormalTriL.params_size(latent_dim)),
tfpl.MultivariateNormalTriL(latent_dim, activity_regularizer=kl_regularizer)
], name='encoder')
def get_decoder(latent_dim):
return Sequential([
Dense(1152, activation='relu', input_shape=(latent_dim,)),
Reshape((3,3,128)),
UpSampling2D(size=(3,3)),
Conv2D(filters=64, kernel_size=3, activation='relu', padding='same'),
UpSampling2D(size=(2,2)),
Conv2D(filters=32, kernel_size=2, activation='relu', padding='same'),
UpSampling2D(size=(2,2)),
Conv2D(filters=128, kernel_size=2, activation='relu', padding='same'),
Conv2D(filters=3, kernel_size=2, activation=None, padding='same'),
Flatten(),
tfpl.IndependentBernoulli(event_shape=(36,36,3))
], name='decoder')
encoder = get_encoder(latent_dim=2, kl_regularizer=kl_regularizer)
#encoder.losses
encoder.summary()
tf.keras.utils.plot_model(encoder, to_file='encoder.png', show_shapes=True)
decoder = get_decoder(latent_dim=2)
decoder.summary()
tf.keras.utils.plot_model(decoder, to_file='decoder.png', show_shapes=True)
def reconstruction_loss(batch_of_images, decoding_dist):
return -tf.reduce_mean(decoding_dist.log_prob(batch_of_images))
Model class and the encoder and decoder models. Print the model summary.fit method, using the training and validation Datasets.vae = Model(inputs=encoder.inputs, outputs=decoder(encoder.outputs))
optimizer = tf.keras.optimizers.Adam(learning_rate=0.0005)
vae.compile(optimizer=optimizer, loss=reconstruction_loss)
tf.keras.utils.plot_model(vae, to_file='vae.png', show_shapes=True)
history = vae.fit(training, validation_data=validation, epochs=20)
nepochs = 20
plt.figure(figsize=(8,5))
plt.plot(range(nepochs), history.history['loss'], label='train-loss')
plt.plot(range(nepochs), history.history['val_loss'], label='valid-loss')
plt.legend()
plt.xlabel('epochs')
plt.ylabel('loss')
plt.show()
def reconstruct(encoder, decoder, batch_of_images):
approx_distribution = encoder(batch_of_images)
decoding_dist = decoder(approx_distribution.mean())
return decoding_dist.mean()
embedding = encoder(image_dataset / 255).mean()
fig, ax = plt.subplots(figsize=(8,8))
plt.scatter(embedding[:,0], embedding[:,1], c='red', s=50, edgecolor='k')
plt.title('Embedding', size=20)
plt.show()
plt.figure(figsize=(6,12))
plt.subplots_adjust(0, 0, 1, 0.95, 0.05, 0.08)
indices = np.random.choice(len(image_dataset), 4)
for i in range(4):
image = image_dataset[indices[i]].numpy()
image = image / image.max()
plt.subplot(4,2,2*i+1), plt.imshow(image), plt.axis('off')
reconstructions = reconstruct(encoder, decoder, np.expand_dims(image, axis=0))
plt.subplot(4,2,2*i+2), plt.imshow(reconstructions[0].numpy()), plt.axis('off')
plt.suptitle('original (left column) vs. VAE-reconstructed (right column)', size=15)
plt.show()
nsample = 6
samples = np.random.uniform(-10, 10, (nsample, latent_dim)) #prior.sample(6)
fig, ax = plt.subplots(figsize=(8,8))
plt.scatter(samples[:,0], samples[:,1], color='blue')
for i in range(nsample):
plt.text(samples[i,0] + 0.05, samples[i,1] + 0.05, 'embedding {}'.format(i), fontsize=15)
plt.title('Embeddings', size=20)
plt.show()
reconstructions = decoder(samples).mean()
#print(samples.shape, reconstructions.shape)
plt.figure(figsize=(8,6))
plt.subplots_adjust(0, 0, 1, 0.9, 0.05, 0.08)
indices = np.random.choice(len(image_dataset), 4)
for i in range(nsample):
plt.subplot(2,3,i+1), plt.imshow(reconstructions[i]), plt.title('image {}'.format(i)), plt.axis('off')
plt.suptitle('VAE-reconstructions', size=20)
plt.show()
# Function to create animation
import matplotlib.animation as anim
from IPython.display import HTML
def get_animation(latent_size, decoder, interpolation_length=500):
assert latent_size >= 2, "Latent space must be at least 2-dimensional for plotting"
fig = plt.figure(figsize=(9, 4))
ax1 = fig.add_subplot(1,2,1)
ax1.set_xlim([-10, 10])
ax1.set_ylim([-10, 10])
ax1.set_title("Latent space")
ax1.axes.get_xaxis().set_visible(False)
ax1.axes.get_yaxis().set_visible(False)
ax2 = fig.add_subplot(1,2,2)
ax2.set_title("Data space")
ax2.axes.get_xaxis().set_visible(False)
ax2.axes.get_yaxis().set_visible(False)
# initializing a line variable
line, = ax1.plot([], [], marker='o')
img2 = ax2.imshow(np.zeros((36, 36, 3)))
freqs = np.random.uniform(low=0.1, high=0.2, size=(latent_size,))
phases = np.random.randn(latent_size)
input_points = np.arange(interpolation_length)
latent_coords = []
for i in range(latent_size):
latent_coords.append(10 * np.sin((freqs[i]*input_points + phases[i])).astype(np.float32))
def animate(i):
z = tf.constant([coord[i] for coord in latent_coords])
img_out = np.squeeze(decoder(z[np.newaxis, ...]).mean().numpy())
line.set_data(z.numpy()[0], z.numpy()[1])
img2.set_data(np.clip(img_out, 0, 1))
return (line, img2)
return anim.FuncAnimation(fig, animate, frames=interpolation_length,
repeat=False, blit=True, interval=300)
# Create the animation
latent_size = 2
a = get_animation(latent_size, decoder, interpolation_length=2000)
HTML(a.to_html5_video())